package com.stay4it.rxjava;


import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/**
 * RxJava线程调度Scheduler
 * 参考链接：http://blog.csdn.net/jdsjlzx/article/details/51685769
 * 
 * 使用ObserveOn和SubscribeOn操作符，你可以让Observable在一个特定的调度器上执行，
 * ObserveOn指示一个Observable在一个特定的调度器上调用观察者的onNext, onError和onCompleted方法，
 * SubscribeOn更进一步，它指示Observable将全部的处理过程（包括发射数据和通知）放在特定的调度器上执行。
 * 
 * 调度器的种类：https://mcxiaoke.gitbooks.io/rxdocs/content/Scheduler.html
 * 各种操作符的默认调度器 参考：https://mcxiaoke.gitbooks.io/rxdocs/content/Scheduler.html
 */
public class RxJava13 {

	private static void test1() throws InterruptedException {
		Observable.just(100)
		.doOnSubscribe(new Action0() {
			
			@Override
			public void call() {
				System.out.println("doOnSubscribe  " + Thread.currentThread().getName());
			}
		})
		.subscribeOn(Schedulers.computation()) //切换线程,如果不调用subscribeOn方法，各个操作符将运行在在自己默认的调度器上
		.map(new Func1<Integer, Integer>() {

			@Override
			public Integer call(Integer t) {
				System.out.println("map1  " + Thread.currentThread().getName());
				return t*5;
			}
		})
		.observeOn(Schedulers.io()) //切换线程
		.map(new Func1<Integer, Integer>() {

			@Override
			public Integer call(Integer t) {
				System.out.println("map2  " + Thread.currentThread().getName());
				return t/2;
			}
		})
		.observeOn(Schedulers.newThread())  //切换线程
		.map(new Func1<Integer, Integer>() {

			@Override
			public Integer call(Integer t) {
				System.out.println("map3  " + Thread.currentThread().getName());
				return t+1;
			}
		})
		.subscribeOn(Schedulers.io())        //observeOn之后再调用subscribeOn无效
		.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer t) {
				System.out.println("subscribe  " + Thread.currentThread().getName());
				System.out.println("call value = " + t);
			}
		});
		
		Thread.sleep(2000);
	}
	
	
	public static void main(String[] args) throws InterruptedException {
		test1();

	}

}
